#!/usr/bin/env python3

# @license
# Copyright 2022-2025 Matter.js Authors
# SPDX-License-Identifier: Apache-2.0

"""Test Metadata Extractor

Collects test metadata from various sources in CHIP and produces a normalized JSON summary.  The format of the JSON blob
is defined in test-descriptor.ts in @matter/testing.

During build, the Dockerfile stores this as /lib/test-descriptor.json.  The @matter/testing harness then loads it at
runtime.
"""

import os
import json
import yaml
import sys
import glob
import inspect
import importlib
import re
import shutil
from matter.testing.matter_testing import MatterBaseTest, MatterTestConfig
from matter.testing.runner import generate_mobly_test_config

YAML_DIR = "/src/app/tests/suites"
PYTHON_DIR = "/src/python_testing"

# JS test harness is higher level than CHIP CI so we ignore boilerplate arguments
PYTHON_IGNORE_ARGS = [
    "--storage-path",
    "--commissioning-method",
    "--discriminator",
    "--passcode",
    "--trace-to",
    "--PICS",
    "--app-pipe",
]

# Somewhere in the code we run for test reflection CHIP decides to create a directory for the test name.  So execute in
# /tmp/metadata-garbage and clean up when we're done
os.mkdir("/tmp/metadata-garbage")
os.chdir("/tmp/metadata-garbage")

step_pattern = re.compile(r"Step ([^)]+):\s*(.*)", re.IGNORECASE)

def read_yaml_file(path):
    with open(path) as input:
        return yaml.full_load(input)

def read_json_file(path):
    with open(path) as input:
        return json.load(input)

tests = []

def load_yaml():
    """Load metadata from YAML tests.

    This is fairly straightforward as the tests are inherently declarative    
    """
    categories = {}
    for category_name, members in read_json_file(f"{YAML_DIR}/manualTests.json").items():
        for test_name in members:
            categories[test_name] = category_name
    
    ci = set()
    for members in read_json_file(f"{YAML_DIR}/ciTests.json").values():
        for test_name in members:
            ci.add(test_name)

    for path in glob.glob(f"{YAML_DIR}/**/*.yaml", recursive = True):
        if "/examples/" in path or path.endswith("/PICS.yaml"):
            continue

        definition = read_yaml_file(path)
        basename = os.path.basename(path)
        name = os.path.splitext(basename)[0]

        props = {
            "kind": "yaml",
            "name": name.removeprefix("Test_").removeprefix("Test").removeprefix("TC_"),
            "path": path
        }

        if "name" in definition:
            props["description"] = definition["name"]

        if basename in categories:
            props["category"] = categories[basename]

        if "config" in definition:
            props["config"] = definition["config"]

        if "PICS" in definition:
            props["pics"] = " & ".join(map(lambda item: f"({item})" if " " in item else item, definition["PICS"]))

        manual = True
        if "tests" in definition:
            members = []
            for test in definition["tests"]:
                step = {"kind": "step"}

                name = test["label"]
                match = step_pattern.match(name)
                if match:
                    step["name"] = match[1]
                    step["description"] = match[2]
                else:
                    step["name"] = name
            
                if "disabled" in test and test["disabled"]:
                    step["isDisabled"] = True
                else:
                    manual = False

                if "PICS" in test and test["PICS"]:
                    step["pics"] = test["PICS"].replace("&&", "&").replace("||", "|")

                members.append(step)

            if members:
                props["members"] = members

        if manual:
                props["kind"] = "manual"

        tests.append(props)

def parse_command_line(str: str):
    # Some scripts have strings with spaces.  Currently only use single quotes but try to handle both that and double
    args = re.findall(r"((?:(?<!\\)'.*?(?<!\\)'|(?<!\\)\".*?(?<!\\)\"|\S)+)", str)
    for i in range(len(args)):
        arg = args[i]
 
        # If quoted, implement rudimentary unescaping
        parts = re.findall(r"((?:[^'\"\\]|\\'|\\\")+|(?<!\\)'.*?(?<!\\)'(?<!\\)|\".*?(?<!\\)\")", arg)
        if len(parts) > 1:
            for j in range(len(parts)):
                part = parts[j]
                if part.startswith("'") or part.startswith('"'):
                    part = part[1:-1]
                    parts[j] = bytes(part, "utf-8").decode("unicode_escape")
            args[i] = "".join(parts)

    return args

def load_python():
    """ Load metadata from Python tests.  This requires a bit more fiddling because:

     * Most metadata is only available from instantiated Python classes

     * We can't rely entirely on code convention due to errors.  Namely, the framework doesn't care if the test class is
       named the same as the file.  It almost always is but some tests have the wrong class name

     * Metadata fields have the test name in them.  So it's not just test.pics, but test.pics_NAME_1_2()

     * Some metadata is embedded in YAML embedded in a comment at the head of the file

     * Some metadata lives in a separate YAML file

     * Some support logic is exposed as API so we can use it, but some is embedded in Mobly or slightly mismatched for
       our purposes
    """
    sys.path.append(PYTHON_DIR)

    test_metadata = read_yaml_file(f"{PYTHON_DIR}/test_metadata.yaml")
    manual = { test["name"]: test["reason"] for test in test_metadata["not_automated"] }
    slow = { test["name"]: test["duration"] for test in test_metadata["slow_tests"] }

    matter_test_config = MatterTestConfig()
    mobly_test_config = generate_mobly_test_config(matter_test_config)

    for path in glob.glob(f"{PYTHON_DIR}/*.py"):
        basename = os.path.basename(path)
        name = os.path.splitext(basename)[0]
        try:
            importlib.import_module(name)
        except ModuleNotFoundError as e:
            # Skip files with whitelisted errors.  Do not skip all errors or build problems may go unnoticed
            if "No module named 'matter.webrtc'" in str(e):
                # We neither test nor install webrtc support.  Test does not fail gracefully
                pass
            elif "No module named 'chip." in str(e):
                # SHA e8a92c8f1037d134d8f477cdc238d2f3a8a42f8b brings in file that references obsolete package names
                pass
            else:
                raise

    def load_class(test_class):
        # Read file and extract YAML header
        path = inspect.getfile(test_class)
        basename = os.path.basename(path)
        header = []
        with open(path) as source:
            in_yaml = False
            
            while True:
                line = source.readline()
                if not line:
                    break

                if "=== BEGIN CI TEST ARGUMENTS ===" in line:
                    in_yaml = True
                    break
            
            while in_yaml:
                line = source.readline()
                if not line:
                    break

                if "=== END CI TEST ARGUMENTS ===" in line:
                    break

                header.append(line.removeprefix("# "))

        # Instantiate the test for introspection purposes
        instance = test_class(mobly_test_config)
        all_test_methods = instance.get_existing_test_names()

        # Extract run information from the YAML header.  Much of this is encoded as command line arguments                
        parsed_header = yaml.full_load("".join(header))
        runs = parsed_header["test-runner-runs"] if parsed_header else None
        run_tests = {}
        if runs:
            for run, config in runs.items():
                script_args = config["script-args"]
                tests_in_run = []
                args = []
                
                if script_args:
                    following = None
                    for arg in parse_command_line(script_args):
                        if following == "--tests":
                            if arg.startswith("-"):
                                 following = None
                            else:
                                tests_in_run.append(arg)
                                continue
                        elif following == "--test-case":
                            tests_in_run.append(arg)
                            following = None
                            continue
                        elif following == "ignored":
                            following = None
                            continue

                        if arg in PYTHON_IGNORE_ARGS:
                            following = "ignored"
                            continue

                        if arg == "--tests" or arg == "--test-case":
                            following = arg
                            continue

                        args.append(arg)

                    if args:
                        config["script-args"] = args
                    else:
                        del config["script-args"]

                if tests_in_run:
                    run_tests[run] = tests_in_run
                else:
                    run_tests[run] = all_test_methods

                if "app" in config:
                    config["app"] = (
                        config["app"]
                            .removeprefix("${")
                            .removeprefix("CHIP_")
                            .removesuffix("}")
                            .removesuffix("_APP")
                            .replace("_", "-")
                            .lower()
                    )
        
        # Add a test for every test/run pair
        for test_method_name in all_test_methods:
            name = test_method_name.removeprefix("test_").removeprefix("TC_")
            props = {
                "kind": "py",
                "name": name,
                "path": path,
                "subpath": test_method_name,
            }

            # Extract description if present
            desc = instance.get_test_desc(name)
            if desc != name:
                props["description"] = desc

            # Extract PICS if present
            pics = instance.get_test_pics(test_method_name)
            if pics:
                props["pics"] = " & ".join(pics)

            # Extract steps if present
            steps = instance.get_defined_test_steps(name)
            if steps:
                if not steps[0].is_commissioning:
                    props["startUncommissioned"] = True
                    
                props["members"] = list(map(lambda step: {
                    "name": step.test_plan_number,
                    "kind": "step",
                    "description": step.description,
                }, steps))

            # Integrate external metadata
            if basename in slow:
                props["isSlow"] = slow[basename]
            if basename in manual:
                props["isManual"] = manual[basename]

            # Install a test for each run configured to invoke the test; if there are multiple runs we create a suite
            # with a test for each run, otherwise the we just create the test
            if header:
                if runs:
                    suite = None
                    
                    method_runs = {k: v for k, v in runs.items() if test_method_name in run_tests[k]}
                    if not method_runs:
                        continue

                    if len(method_runs) > 1:
                        suite = { "kind": "suite", "path": props["path"], "name": props["name"], "members": [] }
                        tests.append(suite)
                    
                    for run, config in method_runs.items():
                        props["config"] = config
                        if suite:
                            suite["members"].append({**props, "name": run, "app": config["app"]})
                        else:
                            tests.append(props)
                else:
                    tests.append(props)
            else:
                tests.append(props)

        # Recurse into derivatives
        load_python_classes(test_class)

    def load_python_classes(test_class):
        for test_class in test_class.__subclasses__():
            load_class(test_class)

    load_python_classes(MatterBaseTest)

load_yaml()
load_python()

shutil.rmtree("/tmp/metadata-garbage")

# Normalize/prettify names and create semantic version for sorting
semantic_name_pattern = re.compile(r"(.*)_(\d+)_(\d+)(?:[_\\-](.*))?")
for test in tests:
    match = semantic_name_pattern.match(test["name"])
    if match:
        suffix = ".%s" % match[4] if match[4] != None else ""
        test["parent"] = match[1]
        test["name"] = "%s.%s%s" % (match[2], match[3], suffix)
        test["semantic_name"] = "%s %04d.%04d%s" % (match[1].lower(), int(match[2]), int(match[3]), suffix)
    else:
        test["semantic_name"] = test["name"].lower()

# Order by semantic name
tests = sorted(tests, key = lambda test: test["semantic_name"])

# Organize into buckets and strip out organizational fields
parents = {}
members = []
for test in tests:
    del test["semantic_name"]

    if "parent" in test:
        parent = None
        if test["parent"] in parents:
            parent = parents[test["parent"]]
        else:
            parent = parents[test["parent"]] = { "name": test["parent"], "kind": "suite", "members": [] }
            members.append(parent)

        parent["members"].append(test)

        del test["parent"]
    else:
        members.append(test)

# Write the result
print(json.dumps({ "name": "chip", "format": 2, "kind": "suite", "members": members }))
